Connectivity Software User's Guide and Reference
Sparkplug Data Source Connections
Rapid Toolkit for Sparkplug > Concepts > Developing Sparkplug Edge Nodes > Sparkplug Data Source Connections
In This Topic

Introduction

In order for the Sparkplug edge node or device to be useful, it needs to have some kind of logical connection to its underlying system, so that it can expose metrics that reflect the data in that underlying system. How this is done differes from project to project, and is up to your code to provide the actual implementation. We call the connection of the Sparkplug edge ndoe or device to the underlying system a data source connection. Rapid Toolkit for Sparkplug defines the plumbing to your code, i.e. it tells your code when to connect and disconnect the data source, and provides means through which you can report successful and failed connections, and disconnections. You can also influence the conditions under which the data source is connected.

A Sparkplug edge node has two connections: The first to the Sparkplug system (MQTT broker), and the second to its data source. A Sparkplug device has one connection: to its data source; it relies on its edge node for connection to the Sparkplug system.

In Rapid Toolkit for Sparkplug, each edge node and each device conceptually has its own data source, and the toolkit provides means for your code to define when and how the data source is connected and disconnected. You can completely ignore this mechanism and control the communication with your underlying system in your own way. Using the mechanism provided by the toolkit, however, allows to implement some useful features, such as connecting to your data source only when the edge node or device is actually publishing the data.

Device Birth and Death

A Sparkplug device publishes Birth and Death messages to indicate its status to the Sparkplug system, and provide initial data and metadata of its metrics. Rapid Toolkit for Sparkplug takes care of publishing the Birth and Death messages automatically, making sure that the correct sequence is always followed. For example, the devices cannot be "born" unless their parent edge node is online, and the edge node Birth message must be published first, and only then the device Birth messages can follow.

In Rapid Toolkit for Sparkplug, the device Birth message is published when the parent edge node is online, and the data source for that device successfully connects. The device Death message is published when the data source for the device disconnects. The lifecycle of the device data source is thus always tied to the device birth and death.

In some cases, you may have a design which requires the Sparkplug device to publish some of its metrics even when there is no communication with physical device (that the Sparkplug device represents). This can be resolved in two ways:

  1. For the purposes of Rapid Toolkit for Sparkplug, "pretend" that the device data source is always successfully connected (this is easy to do, because it is the default behavior and no extra coding is needed). Handle the communication with the physical device outside of Rapid Toolkit for Sparkplug, in any way you like. You must consider what will be the behavior of Sparkplug metrics when the physical device is not available - come up with meaningful default values, status or quality indicators, etc., because these metrics will appear to Sparkplug consumers as always available in the device.
  2. Represent your physical device by two Sparkplug devices. The first one will contain the metrics that are available regardless of the communication status. No custom data source connection handling is needed for this Sparkplug device, and its birth and death will follow the birth and death of the edge node. The second Sparkplug device will contain the metrics that are only available through communication with the physical device, and your code will define custom data source connection handling for it. The births and deaths of this second Sparkplug device will follow the connection status of the data source connection (communuication status).
The birth and death of edge nodes is substantially different from that of devices. Edge node birth and death is not, and cannot be tied to the lifecycle of its data source connection. According to the Sparkplug specification, the edge node must publish its Death and Birth messages either in direct relation to its connection to the MQTT broker (when the edge node has no primary host application configured), or in direct relation to the online/offline status of the primary host application, when configured. Your code can use the behaviors and features provided by Rapid Toolkit for Sparkplug for edge node data source connection, but the edge node data source connections and disconnections do not have any influence on edge node births and deaths.

Custom Data Source Connection Handling

Data publishing by the edge node or device only takes place when its data source is connected. Your implementation needs to define what that means - and how the actual connections to and disconnection from the data source are performed. The behavior can be defined by hooking events handlers or overriding virtual methods. If you do not define it, the default behavior is that the data source connection is a "no operation" and it always succeeds.

You define how to connect to the data source by handling the ConnectDataSource Event on the edge node or device, or by overriding its OnConnectDataSource method. If the event is handled, your code is expected to attempt to connect to the data source (for example, a database or a PLC) and call either the DataSourceConnectionSuccess Method (for success) or the DataSourceConnectionFailure Method (for failure). This can be done synchronously (while handling the event) or asynchronously (e.g. by using a task or a thread). If the event is not handled, the data source is considered immediately successfully connected. Therefore, if you do not need an extra code to connect to your data source, you can rely on this default behavior and not handle the event at all.

You define how to disconnect from the data source by handling the DisconnectDataSource Event on the edge node or device, or by overriding its OnDisconnectDataSource method. If the event is handled, your code is expected to disconnect from the data source (for example, a database or a PLC) and call the DataSourceDisconnectionSuccess Method (the disconnection must always "succeed"). This can be done synchronously (while handling the event) or asynchronously (e.g. by using a task or a thread). If the event is not handled, the data source is considered immediately successfully disconnected (as if your code had called the DataSourceDisconnectionSuccess Method). Therefore, if you do not need an extra code to disconnect from your data source, you can rely on this default behavior and not handle the event at all.

The following example illustrates a custom data source connection handling, with a connection that sometimes succeeds and sometimes fails.

.NET

// This example shows how to implement custom data source connection and disconnection handling for a Sparkplug device.
//
// You can use any Sparkplug application, including our SparkplugCmd utility and the SparkplugApplicationConsoleDemo
// program, to subscribe to the edge node data. 
//
// Find all latest examples here: https://opclabs.doc-that.com/files/onlinedocs/OPCLabs-ConnectivityStudio/Latest/examples.html .
// Sparkplug examples in C# on GitHub: https://github.com/OPCLabs/Examples-ConnectivityStudio-CSharp .
// Missing some example? Ask us for it on our Online Forums, https://www.opclabs.com/forum/index ! You do not have to own
// a commercial license in order to use Online Forums, and we reply to every post.

using OpcLabs.EasySparkplug;
using OpcLabs.EasySparkplug.OperationModel;
using System;
using Timer = System.Timers.Timer;

namespace SparkplugDocExamples.EdgeNode._SparkplugDevice
{
     class ConnectDataSource
    {
        static public void Main1()
        {
            // Note that the default port for the "mqtt" scheme is 1883.
            var hostDescriptor = new SparkplugHostDescriptor("mqtt://localhost");

            // Instantiate the edge node object.
            var edgeNode = new EasySparkplugEdgeNode(hostDescriptor, "easyGroup", "easySparkplugDemo");

            // Hook the SystemConnectionStateChanged event to handle system connection state changes.
            edgeNode.SystemConnectionStateChanged += (sender, eventArgs) =>
            {
                // Display the new connection state (such as when the connection to the broker succeeds or fails).
                Console.WriteLine($"{nameof(EasySparkplugEdgeNode.SystemConnectionStateChanged)}: {eventArgs}");
            };

            // Define a metric on the edge node providing random integers.
            var random = new Random();
            edgeNode.Metrics.Add(new SparkplugMetric("MyMetric1").ReadValueFunction(() => random.Next()));

            // Create a device.
            SparkplugDevice device = SparkplugDevice.CreateIn(edgeNode, "Device");

            // Hook the DataSourceConditionChanged event to handle data source connection state changes.
            device.DataSourceConditionChanged += (sender, eventArgs) =>
            {
                // Display the new data source condition.
                Console.WriteLine($"{nameof(SparkplugDevice.DataSourceConditionChanged)}: {eventArgs}");
            };

            // Hook the events to connect and disconnect the device data source.
            device.ConnectDataSource += DeviceOnConnectDataSource;
            device.DisconnectDataSource += DeviceOnDisconnectDataSource;

            // Define a metric on the device providing random integers.
            device.Metrics.Add(new SparkplugMetric("MyMetric2").ReadValueFunction(() => random.Next()));

            // Start the edge node.
            Console.WriteLine("The edge node is starting...");
            edgeNode.Start();

            Console.WriteLine("The edge node is started.");
            Console.WriteLine();

            // Let the user decide when to stop.
            Console.WriteLine("Press Enter to stop the edge node...");
            Console.ReadLine();

            // Stop the edge node.
            Console.WriteLine("The edge node is stopping...");
            edgeNode.Stop();

            Console.WriteLine("The edge node is stopped.");
        }

        /// <summary>
        /// Handles the <see cref="SparkplugDevice.ConnectDataSource"/> event.
        /// </summary>
        static private void DeviceOnConnectDataSource(object sender, SparkplugProducerProcessedEventArgs eventArgs)
        {
            // The event sender is the device.
            var device = (SparkplugDevice)sender;
            Console.WriteLine(nameof(device.ConnectDataSource));

            // Simulate a connection to the data source that takes 5 seconds to either succeed or fail.
            // In a real application, you would connect to the actual data source here.
            var timer = new Timer(5 * 1000) { AutoReset = false };
            timer.Elapsed += (s, e) =>
            {
                if (Random.Next(0, 3) == 0)
                {
                    // Simulate a successful connection to the data source. Reconnect after 10 seconds.
                    device.DataSourceConnectionSuccess();
                }
                else
                {
                    // Simulate a failure to connect to the data source.
                    device.DataSourceConnectionFailure(
                        "Simulated connection failure.", 
                        10*1000);
                }
            };
            timer.Start();

            // Indicate that the event is handled. This is necessary to do, otherwise the default behavior would kick in,
            // and the data source would be considered immediately successfully connected.
            eventArgs.Handled = true; 
        }

        /// <summary>
        /// Handles the <see cref="SparkplugDevice.DisconnectDataSource"/> event.
        /// </summary>
        static private void DeviceOnDisconnectDataSource(object sender, SparkplugProducerProcessedEventArgs eventArgs)
        {
            // The event sender is the device.
            var device = (SparkplugDevice)sender;
            Console.WriteLine(nameof(device.DisconnectDataSource));
            
            // Simulate a disconnection from the data source that takes 5 seconds.
            // In a real application, you would disconnect from the actual data source here.
            var timer = new Timer(5 * 1000) { AutoReset = false };
            timer.Elapsed += (s, e) => device.DataSourceDisconnectionSuccess();
            timer.Start();

            // Indicate that the event is handled. This is necessary to do, otherwise the default behavior would kick in,
            // and the data source would be considered immediately successfully disconnected.
            eventArgs.Handled = true;
        }


        static private readonly Random Random = new Random();
    }
}
' This example shows how to implement custom data source connection and disconnection handling for a Sparkplug device.
'
' You can use any Sparkplug application, including our SparkplugCmd utility and the SparkplugApplicationConsoleDemo
' program, to subscribe to the edge node data.
'
' Find all latest examples here: https://opclabs.doc-that.com/files/onlinedocs/OPCLabs-ConnectivityStudio/Latest/examples.html .
' Sparkplug examples in C# on GitHub: https://github.com/OPCLabs/Examples-ConnectivityStudio-CSharp .
' Missing some example? Ask us for it on our Online Forums, https://www.opclabs.com/forum/index ! You do not have to own
' a commercial license in order to use Online Forums, and we reply to every post.

Imports OpcLabs.EasySparkplug
Imports OpcLabs.EasySparkplug.OperationModel
Imports Timer = System.Timers.Timer

Namespace Global.SparkplugDocExamples.EdgeNode._SparkplugDevice
    Class ConnectDataSource
        Public Shared Sub Main1()
            ' Note that the default port for the "mqtt" scheme is 1883.
            Dim hostDescriptor = New SparkplugHostDescriptor("mqtt://localhost")

            ' Instantiate the edge node object.
            Dim edgeNode = New EasySparkplugEdgeNode(hostDescriptor, "easyGroup", "easySparkplugDemo")

            ' Hook the SystemConnectionStateChanged event to handle system connection state changes.
            AddHandler edgeNode.SystemConnectionStateChanged,
                Sub(sender, eventArgs)
                    ' Display the new connection state (such as when the connection to the broker succeeds or fails).
                    Console.WriteLine($"{NameOf(EasySparkplugEdgeNode.SystemConnectionStateChanged)}: {eventArgs}")
                End Sub

            ' Define a metric providing random integers.
            Dim random = New Random()
            edgeNode.Metrics.Add(New SparkplugMetric("MyMetric").ReadValueFunction(Function() random.Next()))

            ' Create a device.
            Dim device As SparkplugDevice = SparkplugDevice.CreateIn(edgeNode, "Device")

            ' Hook the DataSourceConditionChanged event to handle data source connection state changes.
            AddHandler device.DataSourceConditionChanged,
                Sub(sender, eventArgs)
                    ' Display the new data source condition.
                    Console.WriteLine($"{NameOf(SparkplugDevice.DataSourceConditionChanged)}: {eventArgs}")
                End Sub

            ' Hook the events to connect and disconnect the device data source.
            AddHandler device.ConnectDataSource, AddressOf DeviceOnConnectDataSource
            AddHandler device.DisconnectDataSource, AddressOf DeviceOnDisconnectDataSource

            ' Define a metric on the device providing random integers.
            device.Metrics.Add(New SparkplugMetric("MyMetric2").ReadValueFunction(Function() random.Next()))

            ' Start the edge node.
            Console.WriteLine("The edge node is starting...")
            edgeNode.Start()

            Console.WriteLine("The edge node is started.")
            Console.WriteLine()

            ' Let the user decide when to stop.
            Console.WriteLine("Press Enter to stop the edge node...")
            Console.ReadLine()

            ' Stop the edge node.
            Console.WriteLine("The edge node is stopping...")
            edgeNode.Stop()

            Console.WriteLine("The edge node is stopped.")
        End Sub

        ''' <summary>
        ''' Handles the <see cref="SparkplugDevice.ConnectDataSource"/> event.
        ''' </summary>
        Private Shared Sub DeviceOnConnectDataSource(ByVal sender As Object, ByVal eventArgs As SparkplugProducerProcessedEventArgs)
            ' The event sender is the device.
            Dim device = CType(sender, SparkplugDevice)
            Console.WriteLine(NameOf(device.ConnectDataSource))

            ' Simulate a connection to the data source that takes 5 seconds to either succeed or fail.
            ' In a real application, you would connect to the actual data source here.
            Dim timer = New Timer(5 * 1000) With {.AutoReset = False}
            AddHandler timer.Elapsed,
                Sub(s, e)
                    If Random.Next(0, 3) = 0 Then
                        ' Simulate a successful connection to the data source. Reconnect after 10 seconds.
                        device.DataSourceConnectionSuccess()
                    Else
                        ' Simulate a failure to connect to the data source.
                        device.DataSourceConnectionFailure(
                            "Simulated connection failure.",
                            10 * 1000)
                    End If
                End Sub
            timer.Start()

            ' Indicate that the event is handled. This is necessary to do, otherwise the default behavior would kick in,
            ' and the data source would be considered immediately successfully connected.
            eventArgs.Handled = True
        End Sub

        ''' <summary>
        ''' Handles the <see cref="SparkplugDevice.DisconnectDataSource"/> event.
        ''' </summary>
        Private Shared Sub DeviceOnDisconnectDataSource(ByVal sender As Object, ByVal eventArgs As SparkplugProducerProcessedEventArgs)
            ' The event sender is the device.
            Dim device = CType(sender, SparkplugDevice)
            Console.WriteLine(NameOf(device.DisconnectDataSource))

            ' Simulate a disconnection from the data source that takes 5 seconds.
            ' In a real application, you would disconnect from the actual data source here.
            Dim timer = New Timer(5 * 1000) With {.AutoReset = False}
            AddHandler timer.Elapsed, Sub(s, e) device.DataSourceDisconnectionSuccess()
            timer.Start()

            ' Indicate that the event is handled. This is necessary to do, otherwise the default behavior would kick in,
            ' and the data source would be considered immediately successfully disconnected.
            eventArgs.Handled = True
        End Sub

        Private Shared ReadOnly Random As Random = New Random()
    End Class
End Namespace

 

Data Source Connection Modes

The data source connection mode defines when and how the Sparkplug producer (edge node or device) connects and disconnects its data source. You can configure the data source connection mode using the DataSourceConnectionMode Property on the edge node or device object.

The choice of the data source connection mode depends on the nature of the implementation. The usual default setting (WhenComponentStarted) attempts to maintain the connection to the data source for the widest possible time. This means that you can e.g. keep collecting the data from the data source even when the connection to the MQTT broker is not available, store the data and publish them later. If you do not have this or similar requirements, you can consider choosing a "narrower" data source connection mode.

Following choices are available for the data source connection mode:

The example below connects the data source (and publishes data) only when the producer is online (with no primary host application set, this means that it connects to the data source only when it is connected to the MQTT broker).

.NET

// This example shows how to connect to the data source only when the producer is online.
//
// You can use any Sparkplug application, including our SparkplugCmd utility and the SparkplugApplicationConsoleDemo
// program, to subscribe to the edge node data. 
//
// Find all latest examples here: https://opclabs.doc-that.com/files/onlinedocs/OPCLabs-ConnectivityStudio/Latest/examples.html .
// Sparkplug examples in C# on GitHub: https://github.com/OPCLabs/Examples-ConnectivityStudio-CSharp .
// Missing some example? Ask us for it on our Online Forums, https://www.opclabs.com/forum/index ! You do not have to own
// a commercial license in order to use Online Forums, and we reply to every post.

using System;
using OpcLabs.EasySparkplug;

namespace SparkplugDocExamples.EdgeNode._EasySparkplugEdgeNode
{
     class DataSourceConnectionMode
    {
        static public void Main1()
        {
            // Note that the default port for the "mqtt" scheme is 1883.
            var hostDescriptor = new SparkplugHostDescriptor("mqtt://localhost");

            // Instantiate the edge node object.
            var edgeNode = new EasySparkplugEdgeNode(hostDescriptor, "easyGroup", "easySparkplugDemo");

            // Configure the edge node to connect to the data source only when the producer is online. You can test it out
            // e.g. by stopping or disconnecting from the MQTT broker.
            //
            // You can compare the output of this example with and without the statement below. With the statement, the
            // Poll event will not be raised unless the producer is online. Without the statement, the Poll event will
            // always be raised while the edge node is started, regardless of the MQTT broker connection state.
            edgeNode.DataSourceConnectionMode = SparkplugDataSourceConnectionMode.WhenProducerOnline;

            // Hook the SystemConnectionStateChanged event to handle system connection state changes.
            edgeNode.SystemConnectionStateChanged += (sender, eventArgs) =>
            {
                // Display the new connection state (such as when the connection to the broker succeeds or fails).
                Console.WriteLine($"{nameof(EasySparkplugEdgeNode.SystemConnectionStateChanged)}: {eventArgs}");
            };

            // Hook the ProducerOnlineChanged event to handle changes in the online state of the producer.
            edgeNode.ProducerOnlineChanged += (sender, eventArgs) =>
            {
                // Display the new producer online state.
                Console.WriteLine($"{nameof(EasySparkplugEdgeNode.ProducerOnlineChanged)}: {edgeNode.ProducerOnline}");
            };

            // Hook the Poll event (but do not mark the polling as processed by ourselves).
            edgeNode.Poll += (sender, eventArgs) =>
            {
                // Display when the component is polling for new data.
                Console.WriteLine(nameof(EasySparkplugEdgeNode.Poll));
            };

            // Define a metric providing random integers.
            var random = new Random();
            edgeNode.Metrics.Add(new SparkplugMetric("MyMetric").ReadValueFunction(() => random.Next()));

            // Start the edge node.
            Console.WriteLine("The edge node is starting...");
            edgeNode.Start();

            Console.WriteLine("The edge node is started.");
            Console.WriteLine();

            // Let the user decide when to stop.
            Console.WriteLine("Press Enter to stop the edge node...");
            Console.ReadLine();

            // Stop the edge node.
            Console.WriteLine("The edge node is stopping...");
            edgeNode.Stop();

            Console.WriteLine("The edge node is stopped.");
        }
    }
}
' This example shows how to connect to the data source only when the producer is online.
'
' You can use any Sparkplug application, including our SparkplugCmd utility and the SparkplugApplicationConsoleDemo
' program, to subscribe to the edge node data.
'
' Find all latest examples here: https://opclabs.doc-that.com/files/onlinedocs/OPCLabs-ConnectivityStudio/Latest/examples.html .
' Sparkplug examples in C# on GitHub: https://github.com/OPCLabs/Examples-ConnectivityStudio-CSharp .
' Missing some example? Ask us for it on our Online Forums, https://www.opclabs.com/forum/index ! You do not have to own
' a commercial license in order to use Online Forums, and we reply to every post.

Imports OpcLabs.EasySparkplug

Namespace Global.SparkplugDocExamples.EdgeNode._EasySparkplugEdgeNode
    Class DataSourceConnectionMode
        Public Shared Sub Main1()
            ' Note that the default port for the "mqtt" scheme is 1883.
            Dim hostDescriptor = New SparkplugHostDescriptor("mqtt://localhost")

            ' Instantiate the edge node object.
            Dim edgeNode = New EasySparkplugEdgeNode(hostDescriptor, "easyGroup", "easySparkplugDemo")

            ' Configure the edge node to connect to the data source only when the producer is online. You can test it out
            ' e.g. by stopping or disconnecting from the MQTT broker.
            ' 
            ' You can compare the output of this example with and without the statement below. With the statement, the
            ' Poll event will not be raised unless the producer is online. Without the statement, the Poll event will
            ' always be raised while the edge node is started, regardless of the MQTT broker connection state.
            edgeNode.DataSourceConnectionMode = SparkplugDataSourceConnectionMode.WhenProducerOnline

            ' Hook the SystemConnectionStateChanged event to handle system connection state changes.
            AddHandler edgeNode.SystemConnectionStateChanged,
                Sub(sender, eventArgs)
                    ' Display the new connection state (such as when the connection to the broker succeeds or fails).
                    Console.WriteLine($"{NameOf(EasySparkplugEdgeNode.SystemConnectionStateChanged)}: {eventArgs}")
                End Sub

            ' Hook the ProducerOnlineChanged event to handle changes in the online state of the producer.
            AddHandler edgeNode.ProducerOnlineChanged,
                Sub(sender, eventArgs)
                    ' Display the new producer online state.
                    Console.WriteLine($"{NameOf(EasySparkplugEdgeNode.ProducerOnlineChanged)}: {edgeNode.ProducerOnline}")
                End Sub

            ' Hook the Poll event (but do not mark the polling as processed by ourselves).
            AddHandler edgeNode.Poll,
                Sub(sender, eventArgs)
                    ' Display when the component Is polling for New data.
                    Console.WriteLine(NameOf(EasySparkplugEdgeNode.Poll))
                End Sub

            ' Define a metric providing random integers.
            Dim random = New Random()
            edgeNode.Metrics.Add(New SparkplugMetric("MyMetric").ReadValueFunction(Function() random.Next()))

            ' Start the edge node.
            Console.WriteLine("The edge node is starting...")
            edgeNode.Start()

            Console.WriteLine("The edge node is started.")
            Console.WriteLine()

            ' Let the user decide when to stop.
            Console.WriteLine("Press Enter to stop the edge node...")
            Console.ReadLine()

            ' Stop the edge node.
            Console.WriteLine("The edge node is stopping...")
            edgeNode.Stop()

            Console.WriteLine("The edge node is stopped.")
        End Sub
    End Class
End Namespace

 

 

Sparkplug is a trademark of Eclipse Foundation, Inc. "MQTT" is a trademark of the OASIS Open standards consortium. Other related terms are trademarks of their respective owners. Any use of these terms on this site is for descriptive purposes only and does not imply any sponsorship, endorsement or affiliation.